/*
 * CTcpSink.cpp
 *
 *  Created on: 2016年5月13日
 *      Author: terry
 */

#include "CTcpSink.h"
#include "RtpPort.h"
#include "CLog.h"
#include "CRtpPackager.h"
#include "H264RtpPackager.h"
#include "H265RtpPackager.h"
#include "RtpHeader.h"
#include <functional>
#include <errno.h>

static const int	TCP_RECV_BUFFER_SIZE = 1024 * 1024 * 4;


namespace av
{

CTcpSink::CTcpSink():
		m_port(),
		m_codec(),
		m_clockRate(90000),
		m_payload(),
		m_callback(),
		m_context(),
		m_packager(new CRtpPackager())
{
	m_buffer.ensure(2048);
}

CTcpSink::~CTcpSink()
{
	close();
}

int CTcpSink::open(int port)
{
	if (port <= 0)
	{
		for (size_t i = 0; i < RtpPort::count(); ++ i)
		{
			port = RtpPort::make();
			if (openSession(port))
			{
				break;
			}
		}
	}
	else
	{
		if ((port % 2) != 0)
		{
			port = (port / 2) * 2;
		}

		if (!openSession(port))
		{
			return EEXIST;
		}
	}

	m_port = port;

	start();

	return 0;
}

void CTcpSink::close()
{
	if (isRunning())
	{
		stop();

		closeSession();
	}

}

bool CTcpSink::isOpen()
{
	return m_listenSocket.isOpen() && isRunning();
}

void CTcpSink::setCallback(RtpSinkCallback cb, void* context)
{
	comn::AutoCritSec lock(m_cs);

	m_callback = cb;
	m_context = context;
}

void CTcpSink::setFormat(int codec, int clockRate)
{
	comn::AutoCritSec lock(m_cs);

	m_codec = codec;
	m_clockRate = clockRate;

	if (m_codec == MEDIA_CODEC_H264)
	{
		m_packager.reset(new H264RtpPackager());
	}
	//else if (m_codec == MEDIA_CODEC_HEVC)
	//{
	//	m_packager.reset(new H265RtpPackager());
	//}
	else if (m_codec == MEDIA_CODEC_G711A)
	{
		m_packager.reset(new CRtpPackager(MEDIA_TYPE_AUDIO));
	}
	else if (m_codec == MEDIA_CODEC_G711U)
	{
		m_packager.reset(new CRtpPackager(MEDIA_TYPE_AUDIO));
	}
	else
	{
		m_packager.reset(new CRtpPackager(MEDIA_TYPE_NONE));
	}
}

void CTcpSink::setCacheSize(int size)
{
	// pass
}

int CTcpSink::getPort()
{
	return m_port;
}

int CTcpSink::run()
{
	int millsec = 1000;

	while (!m_canExit)
	{
	    fd_set rset;
	    FD_ZERO(&rset);
	    FD_SET(m_listenSocket.getHandle(), &rset);

	    fd_set xset;
	    FD_ZERO(&xset);
	    FD_SET(m_listenSocket.getHandle(), &xset);

	    timeval* ptv = NULL;
	    timeval tv = { millsec / 1000, (millsec % 1000) * 1000 };
	    ptv = (millsec >= 0) ? (&tv) : NULL;

	    int nfds = m_listenSocket.getHandle();
	    if (m_connSocket.isOpen())
	    {
	    	FD_SET(m_connSocket.getHandle(), &rset);
	    	FD_SET(m_connSocket.getHandle(), &xset);

	    	if (m_connSocket.getHandle() > m_listenSocket.getHandle())
	    	{
	    		nfds = m_connSocket.getHandle();
	    	}
	    }

	    int ret = select((int)nfds+1, &rset, NULL, &xset, ptv);
	    if (ret == 0)
	    {
	    	continue;
	    }
	    else if (ret < 0)
	    {
	    	m_listenSocket.close();
	    	continue;
	    }
	    else
	    {
	    	if (FD_ISSET(m_listenSocket.getHandle(), &rset))
			{
	    		handleListen();
			}
	    	else if (FD_ISSET(m_listenSocket.getHandle(), &xset))
	    	{
	    		CLog::error("CTcpSink listen exception.\n");
	    	}
	    	else if (m_connSocket.isOpen())
	    	{
	    		if (FD_ISSET(m_connSocket.getHandle(), &rset))
	    		{
	    			handleRead();
	    		}
	    		else if (FD_ISSET(m_connSocket.getHandle(), &xset))
	    		{
	    			handleClose();
	    		}
	    	}
	    }
	}
	return 0;
}

void CTcpSink::doStop()
{
	comn::Socket sock;
	sock.open(SOCK_STREAM);
	sock.setNonblock(true);
	sock.connect(comn::SockAddr("127.0.0.1", m_port));
	sock.close();
}

bool CTcpSink::openSession(int port)
{
	m_listenSocket.close();
	if (!m_listenSocket.open(SOCK_STREAM))
	{
		return false;
	}

	bool done = false;
	int ret = m_listenSocket.bind(comn::SockAddr(NULL, port));
	if (ret == 0)
	{
		m_listenSocket.listen();
		done = true;
	}
	else
	{
		m_listenSocket.close();
	}
	return done;
}

void CTcpSink::closeSession()
{
	m_listenSocket.close();

	m_connSocket.close();

}

void CTcpSink::fireMediaPacket(MediaPacket& packet)
{
    if (m_callback)
    {
        (*m_callback)(this, packet.data, packet.size, packet.pts, packet.flags, m_context);
    }
}

void CTcpSink::handleListen()
{
	comn::SockAddr addr;
	comn::Socket sock = m_listenSocket.accept(addr);
	if (!sock.isOpen())
	{
		return;
	}

	sock.setRecvBufferSize(TCP_RECV_BUFFER_SIZE);

	if (m_connSocket.isOpen())
	{
		CLog::warning("CTcpSink. connection already exist. and new client is coming(%s:%d).\n", addr.getIPAddr(), addr.getPort());
		handleClose();
	}
	else
	{
		CLog::warning("CTcpSink. new client is coming(%s:%d).\n", addr.getIPAddr(), addr.getPort());
	}

	m_connSocket = sock;
}

void CTcpSink::handleRead()
{
	if (m_buffer.length() < sizeof(int))
	{
		int count = sizeof(int) - m_buffer.length();
		int ret = m_connSocket.receive((char*)m_buffer.data() + m_buffer.length(), count);
		if (ret <= 0)
		{
			m_buffer.clear();
			handleClose();
			return;
		}
		else
		{
			m_buffer.resize(m_buffer.length() + ret);
			if (m_buffer.length() < sizeof(int))
			{
				return;
			}
		}
	}

	int32_t rtpLength = *((int32_t*)m_buffer.data());
	rtpLength = ntohl(rtpLength);

	m_buffer.ensure(rtpLength + sizeof(int32_t));

	int count = rtpLength - (m_buffer.length() - sizeof(int32_t));
	int ret = m_connSocket.receive((char*)m_buffer.data() + m_buffer.length(), count);
	if (ret <= 0)
	{
		m_buffer.clear();
		handleClose();
		return;
	}
	else
	{
		m_buffer.resize(m_buffer.length() + ret);
		if (ret == count)
		{
			RTP_FIXED_HEADER* pRtpHeader = (RTP_FIXED_HEADER*)(m_buffer.data() + sizeof(int));
			int pktLength = rtpLength - sizeof(RTP_FIXED_HEADER);

			RtpPacket pkt;
			memset(&pkt, 0, sizeof(pkt));
			pkt.mark = pRtpHeader->marker;
			pkt.ts = ntohl(pRtpHeader->timestamp);
			pkt.data = (uint8_t*)pRtpHeader + sizeof(RTP_FIXED_HEADER);
			pkt.size = pktLength;
			pkt.pt = pRtpHeader->payload;

			MediaPacket pktOut;
			if (m_packager->join(pkt, pktOut))
			{
				fireMediaPacket(pktOut);
			}

			m_buffer.clear();
		}
	}
}

void CTcpSink::handleClose()
{
	comn::SockAddr addr = m_connSocket.getPeerName();
	CLog::debug("CTcpSink connection closed. %s:%d\n", addr.getIPAddr(), addr.getPort());

	m_connSocket.close();
}



} /* namespace av */
